package com.huan.flink.sideoutput;

import com.huan.flink.dto.Animal;
import com.huan.flink.dto.Chicken;
import com.huan.flink.dto.Duck;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * 需求： 根据数据的type进行判断，转换成不同的对象，分别做处理
 * 实现：通过 side output 侧输出流来实现
 *
 * @author huan.fu
 * @date 2023/9/23 - 10:01
 */
public class SideOutputStreamApplication {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<Animal> ds = environment.fromElements(
                        new Animal("1", "鸭子"),
                        new Animal("1", "小鸭子"),
                        new Animal("2", "三黄鸡"),
                        new Animal("2", "大公鸡"),
                        new Animal("3", "其他动物")
                )
                .map(a -> a);

        /**
         * 定义一个侧输出流标签
         * 第一个参数：标签名
         * 第二个参数： 放入侧输出流中的数据的数据类型
         */
        OutputTag<Duck> duckOutputTag = new OutputTag<>("duck", Types.POJO(Duck.class));
        OutputTag<Chicken> chickenOutputTag = new OutputTag<>("chicken", Types.POJO(Chicken.class));

        /**
         * process方法参数
         * 第一个参数：ProcessFunction 处理方法
         * 第二个参数： 主流中的数据类型
         */
        SingleOutputStreamOperator<Animal> os = ds.process(new ProcessFunction<Animal, Animal>() {
            @Override
            public void processElement(Animal animal, ProcessFunction<Animal, Animal>.Context ctx, Collector<Animal> out) throws Exception {
                switch (animal.getType()) {
                    case "1":
                        Duck duck = new Duck(animal.getType(), animal.getName());
                        // 将 Duck 对象放入 侧输出流中
                        ctx.output(duckOutputTag, duck);
                        break;
                    case "2":
                        Chicken chicken = new Chicken(animal.getType(), animal.getName());
                        // 将 Chicken 对象放入侧输出流中
                        ctx.output(chickenOutputTag, chicken);
                        break;
                    default:
                        // 将 Animal 对象放入主流中
                        out.collect(animal);
                        break;
                }
            }
        }, Types.POJO(Animal.class));

        // 获取Duck侧输出流
        SideOutputDataStream<Duck> duckOs = os.getSideOutput(duckOutputTag);
        // 获取Chicken侧输出流
        SideOutputDataStream<Chicken> chickenOs = os.getSideOutput(chickenOutputTag);

        duckOs.print("duck 侧输出流");
        chickenOs.print("chicken 侧输出流");
        os.print("主流");

        environment.execute("分流");
    }
}
